#include <xrpld/app/misc/HashRouter.h>
#include <xrpld/app/misc/NetworkOPs.h>
#include <xrpld/app/misc/ValidatorList.h>
#include <xrpld/app/misc/ValidatorSite.h>
#include <xrpld/app/rdb/RelationalDatabase.h>
#include <xrpld/app/rdb/Wallet.h>
#include <xrpld/overlay/Cluster.h>
#include <xrpld/overlay/detail/ConnectAttempt.h>
#include <xrpld/overlay/detail/PeerImp.h>
#include <xrpld/overlay/detail/TrafficCount.h>
#include <xrpld/overlay/detail/Tuning.h>
#include <xrpld/overlay/predicates.h>
#include <xrpld/peerfinder/make_Manager.h>
#include <xrpld/rpc/handlers/GetCounts.h>
#include <xrpld/rpc/json_body.h>

#include <xrpl/basics/base64.h>
#include <xrpl/basics/make_SSLContext.h>
#include <xrpl/basics/random.h>
#include <xrpl/beast/core/LexicalCast.h>
#include <xrpl/protocol/STTx.h>
#include <xrpl/server/SimpleWriter.h>

#include <boost/algorithm/string/predicate.hpp>
#include <boost/asio/executor_work_guard.hpp>

namespace ripple {

namespace CrawlOptions {
enum {
    Disabled = 0,
    Overlay = (1 << 0),
    ServerInfo = (1 << 1),
    ServerCounts = (1 << 2),
    Unl = (1 << 3)
};
}

//------------------------------------------------------------------------------

OverlayImpl::Child::Child(OverlayImpl& overlay) : overlay_(overlay)
{
}

OverlayImpl::Child::~Child()
{
    overlay_.remove(*this);
}

//------------------------------------------------------------------------------

OverlayImpl::Timer::Timer(OverlayImpl& overlay)
    : Child(overlay), timer_(overlay_.io_context_)
{
}

void
OverlayImpl::Timer::stop()
{
    // This method is only ever called from the same strand that calls
    // Timer::on_timer, ensuring they never execute concurrently.
    stopping_ = true;
    timer_.cancel();
}

void
OverlayImpl::Timer::async_wait()
{
    timer_.expires_after(std::chrono::seconds(1));
    timer_.async_wait(boost::asio::bind_executor(
        overlay_.strand_,
        std::bind(
            &Timer::on_timer, shared_from_this(), std::placeholders::_1)));
}

void
OverlayImpl::Timer::on_timer(error_code ec)
{
    if (ec || stopping_)
    {
        if (ec && ec != boost::asio::error::operation_aborted)
        {
            JLOG(overlay_.journal_.error()) << "on_timer: " << ec.message();
        }
        return;
    }

    overlay_.m_peerFinder->once_per_second();
    overlay_.sendEndpoints();
    overlay_.autoConnect();
    if (overlay_.app_.config().TX_REDUCE_RELAY_ENABLE)
        overlay_.sendTxQueue();

    if ((++overlay_.timer_count_ % Tuning::checkIdlePeers) == 0)
        overlay_.deleteIdlePeers();

    async_wait();
}

//------------------------------------------------------------------------------

OverlayImpl::OverlayImpl(
    Application& app,
    Setup const& setup,
    ServerHandler& serverHandler,
    Resource::Manager& resourceManager,
    Resolver& resolver,
    boost::asio::io_context& io_context,
    BasicConfig const& config,
    beast::insight::Collector::ptr const& collector)
    : app_(app)
    , io_context_(io_context)
    , work_(std::in_place, boost::asio::make_work_guard(io_context_))
    , strand_(boost::asio::make_strand(io_context_))
    , setup_(setup)
    , journal_(app_.journal("Overlay"))
    , serverHandler_(serverHandler)
    , m_resourceManager(resourceManager)
    , m_peerFinder(PeerFinder::make_Manager(
          io_context,
          stopwatch(),
          app_.journal("PeerFinder"),
          config,
          collector))
    , m_resolver(resolver)
    , next_id_(1)
    , timer_count_(0)
    , slots_(app.logs(), *this, app.config())
    , m_stats(
          std::bind(&OverlayImpl::collect_metrics, this),
          collector,
          [counts = m_traffic.getCounts(), collector]() {
              std::unordered_map<TrafficCount::category, TrafficGauges> ret;

              for (auto const& pair : counts)
                  ret.emplace(
                      pair.first, TrafficGauges(pair.second.name, collector));

              return ret;
          }())
{
    beast::PropertyStream::Source::add(m_peerFinder.get());
}

Handoff
OverlayImpl::onHandoff(
    std::unique_ptr<stream_type>&& stream_ptr,
    http_request_type&& request,
    endpoint_type remote_endpoint)
{
    auto const id = next_id_++;
    beast::WrappedSink sink(app_.logs()["Peer"], makePrefix(id));
    beast::Journal journal(sink);

    Handoff handoff;
    if (processRequest(request, handoff))
        return handoff;
    if (!isPeerUpgrade(request))
        return handoff;

    handoff.moved = true;

    JLOG(journal.debug()) << "Peer connection upgrade from " << remote_endpoint;

    error_code ec;
    auto const local_endpoint(
        stream_ptr->next_layer().socket().local_endpoint(ec));
    if (ec)
    {
        JLOG(journal.debug()) << remote_endpoint << " failed: " << ec.message();
        return handoff;
    }

    auto consumer = m_resourceManager.newInboundEndpoint(
        beast::IPAddressConversion::from_asio(remote_endpoint));
    if (consumer.disconnect(journal))
        return handoff;

    auto const [slot, result] = m_peerFinder->new_inbound_slot(
        beast::IPAddressConversion::from_asio(local_endpoint),
        beast::IPAddressConversion::from_asio(remote_endpoint));

    if (slot == nullptr)
    {
        // connection refused either IP limit exceeded or self-connect
        handoff.moved = false;
        JLOG(journal.debug())
            << "Peer " << remote_endpoint << " refused, " << to_string(result);
        return handoff;
    }

    // Validate HTTP request

    {
        auto const types = beast::rfc2616::split_commas(request["Connect-As"]);
        if (std::find_if(types.begin(), types.end(), [](std::string const& s) {
                return boost::iequals(s, "peer");
            }) == types.end())
        {
            handoff.moved = false;
            handoff.response =
                makeRedirectResponse(slot, request, remote_endpoint.address());
            handoff.keep_alive = beast::rfc2616::is_keep_alive(request);
            return handoff;
        }
    }

    auto const negotiatedVersion = negotiateProtocolVersion(request["Upgrade"]);
    if (!negotiatedVersion)
    {
        m_peerFinder->on_closed(slot);
        handoff.moved = false;
        handoff.response = makeErrorResponse(
            slot,
            request,
            remote_endpoint.address(),
            "Unable to agree on a protocol version");
        handoff.keep_alive = false;
        return handoff;
    }

    auto const sharedValue = makeSharedValue(*stream_ptr, journal);
    if (!sharedValue)
    {
        m_peerFinder->on_closed(slot);
        handoff.moved = false;
        handoff.response = makeErrorResponse(
            slot,
            request,
            remote_endpoint.address(),
            "Incorrect security cookie");
        handoff.keep_alive = false;
        return handoff;
    }

    try
    {
        auto publicKey = verifyHandshake(
            request,
            *sharedValue,
            setup_.networkID,
            setup_.public_ip,
            remote_endpoint.address(),
            app_);

        consumer.setPublicKey(publicKey);

        {
            // The node gets a reserved slot if it is in our cluster
            // or if it has a reservation.
            bool const reserved =
                static_cast<bool>(app_.cluster().member(publicKey)) ||
                app_.peerReservations().contains(publicKey);
            auto const result =
                m_peerFinder->activate(slot, publicKey, reserved);
            if (result != PeerFinder::Result::success)
            {
                m_peerFinder->on_closed(slot);
                JLOG(journal.debug()) << "Peer " << remote_endpoint
                                      << " redirected, " << to_string(result);
                handoff.moved = false;
                handoff.response = makeRedirectResponse(
                    slot, request, remote_endpoint.address());
                handoff.keep_alive = false;
                return handoff;
            }
        }

        auto const peer = std::make_shared<PeerImp>(
            app_,
            id,
            slot,
            std::move(request),
            publicKey,
            *negotiatedVersion,
            consumer,
            std::move(stream_ptr),
            *this);
        {
            // As we are not on the strand, run() must be called
            // while holding the lock, otherwise new I/O can be
            // queued after a call to stop().
            std::lock_guard<decltype(mutex_)> lock(mutex_);
            {
                auto const result = m_peers.emplace(peer->slot(), peer);
                XRPL_ASSERT(
                    result.second,
                    "ripple::OverlayImpl::onHandoff : peer is inserted");
                (void)result.second;
            }
            list_.emplace(peer.get(), peer);

            peer->run();
        }
        handoff.moved = true;
        return handoff;
    }
    catch (std::exception const& e)
    {
        JLOG(journal.debug()) << "Peer " << remote_endpoint
                              << " fails handshake (" << e.what() << ")";

        m_peerFinder->on_closed(slot);
        handoff.moved = false;
        handoff.response = makeErrorResponse(
            slot, request, remote_endpoint.address(), e.what());
        handoff.keep_alive = false;
        return handoff;
    }
}

//------------------------------------------------------------------------------

bool
OverlayImpl::isPeerUpgrade(http_request_type const& request)
{
    if (!is_upgrade(request))
        return false;
    auto const versions = parseProtocolVersions(request["Upgrade"]);
    return !versions.empty();
}

std::string
OverlayImpl::makePrefix(std::uint32_t id)
{
    std::stringstream ss;
    ss << "[" << std::setfill('0') << std::setw(3) << id << "] ";
    return ss.str();
}

std::shared_ptr<Writer>
OverlayImpl::makeRedirectResponse(
    std::shared_ptr<PeerFinder::Slot> const& slot,
    http_request_type const& request,
    address_type remote_address)
{
    boost::beast::http::response<json_body> msg;
    msg.version(request.version());
    msg.result(boost::beast::http::status::service_unavailable);
    msg.insert("Server", BuildInfo::getFullVersionString());
    {
        std::ostringstream ostr;
        ostr << remote_address;
        msg.insert("Remote-Address", ostr.str());
    }
    msg.insert("Content-Type", "application/json");
    msg.insert(boost::beast::http::field::connection, "close");
    msg.body() = Json::objectValue;
    {
        Json::Value& ips = (msg.body()["peer-ips"] = Json::arrayValue);
        for (auto const& _ : m_peerFinder->redirect(slot))
            ips.append(_.address.to_string());
    }
    msg.prepare_payload();
    return std::make_shared<SimpleWriter>(msg);
}

std::shared_ptr<Writer>
OverlayImpl::makeErrorResponse(
    std::shared_ptr<PeerFinder::Slot> const& slot,
    http_request_type const& request,
    address_type remote_address,
    std::string text)
{
    boost::beast::http::response<boost::beast::http::empty_body> msg;
    msg.version(request.version());
    msg.result(boost::beast::http::status::bad_request);
    msg.reason("Bad Request (" + text + ")");
    msg.insert("Server", BuildInfo::getFullVersionString());
    msg.insert("Remote-Address", remote_address.to_string());
    msg.insert(boost::beast::http::field::connection, "close");
    msg.prepare_payload();
    return std::make_shared<SimpleWriter>(msg);
}

//------------------------------------------------------------------------------

void
OverlayImpl::connect(beast::IP::Endpoint const& remote_endpoint)
{
    XRPL_ASSERT(work_, "ripple::OverlayImpl::connect : work is set");

    auto usage = resourceManager().newOutboundEndpoint(remote_endpoint);
    if (usage.disconnect(journal_))
    {
        JLOG(journal_.info()) << "Over resource limit: " << remote_endpoint;
        return;
    }

    auto const [slot, result] = peerFinder().new_outbound_slot(remote_endpoint);
    if (slot == nullptr)
    {
        JLOG(journal_.debug()) << "Connect: No slot for " << remote_endpoint
                               << ": " << to_string(result);
        return;
    }

    auto const p = std::make_shared<ConnectAttempt>(
        app_,
        io_context_,
        beast::IPAddressConversion::to_asio_endpoint(remote_endpoint),
        usage,
        setup_.context,
        next_id_++,
        slot,
        app_.journal("Peer"),
        *this);

    std::lock_guard lock(mutex_);
    list_.emplace(p.get(), p);
    p->run();
}

//------------------------------------------------------------------------------

// Adds a peer that is already handshaked and active
void
OverlayImpl::add_active(std::shared_ptr<PeerImp> const& peer)
{
    beast::WrappedSink sink{journal_.sink(), peer->prefix()};
    beast::Journal journal{sink};

    std::lock_guard lock(mutex_);

    {
        auto const result = m_peers.emplace(peer->slot(), peer);
        XRPL_ASSERT(
            result.second,
            "ripple::OverlayImpl::add_active : peer is inserted");
        (void)result.second;
    }

    {
        auto const result = ids_.emplace(
            std::piecewise_construct,
            std::make_tuple(peer->id()),
            std::make_tuple(peer));
        XRPL_ASSERT(
            result.second,
            "ripple::OverlayImpl::add_active : peer ID is inserted");
        (void)result.second;
    }

    list_.emplace(peer.get(), peer);

    JLOG(journal.debug()) << "activated";

    // As we are not on the strand, run() must be called
    // while holding the lock, otherwise new I/O can be
    // queued after a call to stop().
    peer->run();
}

void
OverlayImpl::remove(std::shared_ptr<PeerFinder::Slot> const& slot)
{
    std::lock_guard lock(mutex_);
    auto const iter = m_peers.find(slot);
    XRPL_ASSERT(
        iter != m_peers.end(), "ripple::OverlayImpl::remove : valid input");
    m_peers.erase(iter);
}

void
OverlayImpl::start()
{
    PeerFinder::Config config = PeerFinder::Config::makeConfig(
        app_.config(),
        serverHandler_.setup().overlay.port(),
        app_.getValidationPublicKey().has_value(),
        setup_.ipLimit);

    m_peerFinder->setConfig(config);
    m_peerFinder->start();

    // Populate our boot cache: if there are no entries in [ips] then we use
    // the entries in [ips_fixed].
    auto bootstrapIps =
        app_.config().IPS.empty() ? app_.config().IPS_FIXED : app_.config().IPS;

    // If nothing is specified, default to several well-known high-capacity
    // servers to serve as bootstrap:
    if (bootstrapIps.empty())
    {
        // Pool of servers operated by Ripple Labs Inc. - https://ripple.com
        bootstrapIps.push_back("r.ripple.com 51235");

        // Pool of servers operated by ISRDC - https://isrdc.in
        bootstrapIps.push_back("sahyadri.isrdc.in 51235");

        // Pool of servers operated by @Xrpkuwait - https://xrpkuwait.com
        bootstrapIps.push_back("hubs.xrpkuwait.com 51235");

        // Pool of servers operated by XRPL Commons - https://xrpl-commons.org
        bootstrapIps.push_back("hub.xrpl-commons.org 51235");
    }

    m_resolver.resolve(
        bootstrapIps,
        [this](
            std::string const& name,
            std::vector<beast::IP::Endpoint> const& addresses) {
            std::vector<std::string> ips;
            ips.reserve(addresses.size());
            for (auto const& addr : addresses)
            {
                if (addr.port() == 0)
                    ips.push_back(to_string(addr.at_port(DEFAULT_PEER_PORT)));
                else
                    ips.push_back(to_string(addr));
            }

            std::string const base("config: ");
            if (!ips.empty())
                m_peerFinder->addFallbackStrings(base + name, ips);
        });

    // Add the ips_fixed from the rippled.cfg file
    if (!app_.config().standalone() && !app_.config().IPS_FIXED.empty())
    {
        m_resolver.resolve(
            app_.config().IPS_FIXED,
            [this](
                std::string const& name,
                std::vector<beast::IP::Endpoint> const& addresses) {
                std::vector<beast::IP::Endpoint> ips;
                ips.reserve(addresses.size());

                for (auto& addr : addresses)
                {
                    if (addr.port() == 0)
                        ips.emplace_back(addr.address(), DEFAULT_PEER_PORT);
                    else
                        ips.emplace_back(addr);
                }

                if (!ips.empty())
                    m_peerFinder->addFixedPeer(name, ips);
            });
    }
    auto const timer = std::make_shared<Timer>(*this);
    std::lock_guard lock(mutex_);
    list_.emplace(timer.get(), timer);
    timer_ = timer;
    timer->async_wait();
}

void
OverlayImpl::stop()
{
    boost::asio::dispatch(strand_, std::bind(&OverlayImpl::stopChildren, this));
    {
        std::unique_lock<decltype(mutex_)> lock(mutex_);
        cond_.wait(lock, [this] { return list_.empty(); });
    }
    m_peerFinder->stop();
}

//------------------------------------------------------------------------------
//
// PropertyStream
//
//------------------------------------------------------------------------------

void
OverlayImpl::onWrite(beast::PropertyStream::Map& stream)
{
    beast::PropertyStream::Set set("traffic", stream);
    auto const stats = m_traffic.getCounts();
    for (auto const& pair : stats)
    {
        beast::PropertyStream::Map item(set);
        item["category"] = pair.second.name;
        item["bytes_in"] = std::to_string(pair.second.bytesIn.load());
        item["messages_in"] = std::to_string(pair.second.messagesIn.load());
        item["bytes_out"] = std::to_string(pair.second.bytesOut.load());
        item["messages_out"] = std::to_string(pair.second.messagesOut.load());
    }
}

//------------------------------------------------------------------------------
/** A peer has connected successfully
    This is called after the peer handshake has been completed and during
    peer activation. At this point, the peer address and the public key
    are known.
*/
void
OverlayImpl::activate(std::shared_ptr<PeerImp> const& peer)
{
    beast::WrappedSink sink{journal_.sink(), peer->prefix()};
    beast::Journal journal{sink};

    // Now track this peer
    {
        std::lock_guard lock(mutex_);
        auto const result(ids_.emplace(
            std::piecewise_construct,
            std::make_tuple(peer->id()),
            std::make_tuple(peer)));
        XRPL_ASSERT(
            result.second,
            "ripple::OverlayImpl::activate : peer ID is inserted");
        (void)result.second;
    }

    JLOG(journal.debug()) << "activated";

    // We just accepted this peer so we have non-zero active peers
    XRPL_ASSERT(size(), "ripple::OverlayImpl::activate : nonzero peers");
}

void
OverlayImpl::onPeerDeactivate(Peer::id_t id)
{
    std::lock_guard lock(mutex_);
    ids_.erase(id);
}

void
OverlayImpl::onManifests(
    std::shared_ptr<protocol::TMManifests> const& m,
    std::shared_ptr<PeerImp> const& from)
{
    auto const n = m->list_size();
    auto const& journal = from->pjournal();

    protocol::TMManifests relay;

    for (std::size_t i = 0; i < n; ++i)
    {
        auto& s = m->list().Get(i).stobject();

        if (auto mo = deserializeManifest(s))
        {
            auto const serialized = mo->serialized;

            auto const result =
                app_.validatorManifests().applyManifest(std::move(*mo));

            if (result == ManifestDisposition::accepted)
            {
                relay.add_list()->set_stobject(s);

                // N.B.: this is important; the applyManifest call above moves
                //       the loaded Manifest out of the optional so we need to
                //       reload it here.
                mo = deserializeManifest(serialized);
                XRPL_ASSERT(
                    mo,
                    "ripple::OverlayImpl::onManifests : manifest "
                    "deserialization succeeded");

                app_.getOPs().pubManifest(*mo);

                if (app_.validators().listed(mo->masterKey))
                {
                    auto db = app_.getWalletDB().checkoutDb();
                    addValidatorManifest(*db, serialized);
                }
            }
        }
        else
        {
            JLOG(journal.debug())
                << "Malformed manifest #" << i + 1 << ": " << strHex(s);
            continue;
        }
    }

    if (!relay.list().empty())
        for_each([m2 = std::make_shared<Message>(relay, protocol::mtMANIFESTS)](
                     std::shared_ptr<PeerImp>&& p) { p->send(m2); });
}

void
OverlayImpl::reportInboundTraffic(TrafficCount::category cat, int size)
{
    m_traffic.addCount(cat, true, size);
}

void
OverlayImpl::reportOutboundTraffic(TrafficCount::category cat, int size)
{
    m_traffic.addCount(cat, false, size);
}
/** The number of active peers on the network
    Active peers are only those peers that have completed the handshake
    and are running the Ripple protocol.
*/
std::size_t
OverlayImpl::size() const
{
    std::lock_guard lock(mutex_);
    return ids_.size();
}

int
OverlayImpl::limit()
{
    return m_peerFinder->config().maxPeers;
}

Json::Value
OverlayImpl::getOverlayInfo()
{
    using namespace std::chrono;
    Json::Value jv;
    auto& av = jv["active"] = Json::Value(Json::arrayValue);

    for_each([&](std::shared_ptr<PeerImp>&& sp) {
        auto& pv = av.append(Json::Value(Json::objectValue));
        pv[jss::public_key] = base64_encode(
            sp->getNodePublic().data(), sp->getNodePublic().size());
        pv[jss::type] = sp->slot()->inbound() ? "in" : "out";
        pv[jss::uptime] = static_cast<std::uint32_t>(
            duration_cast<seconds>(sp->uptime()).count());
        if (sp->crawl())
        {
            pv[jss::ip] = sp->getRemoteAddress().address().to_string();
            if (sp->slot()->inbound())
            {
                if (auto port = sp->slot()->listening_port())
                    pv[jss::port] = *port;
            }
            else
            {
                pv[jss::port] = std::to_string(sp->getRemoteAddress().port());
            }
        }

        {
            auto version{sp->getVersion()};
            if (!version.empty())
                // Could move here if Json::value supported moving from strings
                pv[jss::version] = std::string{version};
        }

        std::uint32_t minSeq, maxSeq;
        sp->ledgerRange(minSeq, maxSeq);
        if (minSeq != 0 || maxSeq != 0)
            pv[jss::complete_ledgers] =
                std::to_string(minSeq) + "-" + std::to_string(maxSeq);
    });

    return jv;
}

Json::Value
OverlayImpl::getServerInfo()
{
    bool const humanReadable = false;
    bool const admin = false;
    bool const counters = false;

    Json::Value server_info =
        app_.getOPs().getServerInfo(humanReadable, admin, counters);

    // Filter out some information
    server_info.removeMember(jss::hostid);
    server_info.removeMember(jss::load_factor_fee_escalation);
    server_info.removeMember(jss::load_factor_fee_queue);
    server_info.removeMember(jss::validation_quorum);

    if (server_info.isMember(jss::validated_ledger))
    {
        Json::Value& validated_ledger = server_info[jss::validated_ledger];

        validated_ledger.removeMember(jss::base_fee);
        validated_ledger.removeMember(jss::reserve_base_xrp);
        validated_ledger.removeMember(jss::reserve_inc_xrp);
    }

    return server_info;
}

Json::Value
OverlayImpl::getServerCounts()
{
    return getCountsJson(app_, 10);
}

Json::Value
OverlayImpl::getUnlInfo()
{
    Json::Value validators = app_.validators().getJson();

    if (validators.isMember(jss::publisher_lists))
    {
        Json::Value& publisher_lists = validators[jss::publisher_lists];

        for (auto& publisher : publisher_lists)
        {
            publisher.removeMember(jss::list);
        }
    }

    validators.removeMember(jss::signing_keys);
    validators.removeMember(jss::trusted_validator_keys);
    validators.removeMember(jss::validation_quorum);

    Json::Value validatorSites = app_.validatorSites().getJson();

    if (validatorSites.isMember(jss::validator_sites))
    {
        validators[jss::validator_sites] =
            std::move(validatorSites[jss::validator_sites]);
    }

    return validators;
}

// Returns information on verified peers.
Json::Value
OverlayImpl::json()
{
    Json::Value json;
    for (auto const& peer : getActivePeers())
    {
        json.append(peer->json());
    }
    return json;
}

bool
OverlayImpl::processCrawl(http_request_type const& req, Handoff& handoff)
{
    if (req.target() != "/crawl" ||
        setup_.crawlOptions == CrawlOptions::Disabled)
        return false;

    boost::beast::http::response<json_body> msg;
    msg.version(req.version());
    msg.result(boost::beast::http::status::ok);
    msg.insert("Server", BuildInfo::getFullVersionString());
    msg.insert("Content-Type", "application/json");
    msg.insert("Connection", "close");
    msg.body()["version"] = Json::Value(2u);

    if (setup_.crawlOptions & CrawlOptions::Overlay)
    {
        msg.body()["overlay"] = getOverlayInfo();
    }
    if (setup_.crawlOptions & CrawlOptions::ServerInfo)
    {
        msg.body()["server"] = getServerInfo();
    }
    if (setup_.crawlOptions & CrawlOptions::ServerCounts)
    {
        msg.body()["counts"] = getServerCounts();
    }
    if (setup_.crawlOptions & CrawlOptions::Unl)
    {
        msg.body()["unl"] = getUnlInfo();
    }

    msg.prepare_payload();
    handoff.response = std::make_shared<SimpleWriter>(msg);
    return true;
}

bool
OverlayImpl::processValidatorList(
    http_request_type const& req,
    Handoff& handoff)
{
    // If the target is in the form "/vl/<validator_list_public_key>",
    // return the most recent validator list for that key.
    constexpr std::string_view prefix("/vl/");

    if (!req.target().starts_with(prefix.data()) || !setup_.vlEnabled)
        return false;

    std::uint32_t version = 1;

    boost::beast::http::response<json_body> msg;
    msg.version(req.version());
    msg.insert("Server", BuildInfo::getFullVersionString());
    msg.insert("Content-Type", "application/json");
    msg.insert("Connection", "close");

    auto fail = [&msg, &handoff](auto status) {
        msg.result(status);
        msg.insert("Content-Length", "0");

        msg.body() = Json::nullValue;

        msg.prepare_payload();
        handoff.response = std::make_shared<SimpleWriter>(msg);
        return true;
    };

    std::string_view key = req.target().substr(prefix.size());

    if (auto slash = key.find('/'); slash != std::string_view::npos)
    {
        auto verString = key.substr(0, slash);
        if (!boost::conversion::try_lexical_convert(verString, version))
            return fail(boost::beast::http::status::bad_request);
        key = key.substr(slash + 1);
    }

    if (key.empty())
        return fail(boost::beast::http::status::bad_request);

    // find the list
    auto vl = app_.validators().getAvailable(key, version);

    if (!vl)
    {
        // 404 not found
        return fail(boost::beast::http::status::not_found);
    }
    else if (!*vl)
    {
        return fail(boost::beast::http::status::bad_request);
    }
    else
    {
        msg.result(boost::beast::http::status::ok);

        msg.body() = *vl;

        msg.prepare_payload();
        handoff.response = std::make_shared<SimpleWriter>(msg);
        return true;
    }
}

bool
OverlayImpl::processHealth(http_request_type const& req, Handoff& handoff)
{
    if (req.target() != "/health")
        return false;
    boost::beast::http::response<json_body> msg;
    msg.version(req.version());
    msg.insert("Server", BuildInfo::getFullVersionString());
    msg.insert("Content-Type", "application/json");
    msg.insert("Connection", "close");

    auto info = getServerInfo();

    int last_validated_ledger_age = -1;
    if (info.isMember(jss::validated_ledger))
        last_validated_ledger_age =
            info[jss::validated_ledger][jss::age].asInt();
    bool amendment_blocked = false;
    if (info.isMember(jss::amendment_blocked))
        amendment_blocked = true;
    int number_peers = info[jss::peers].asInt();
    std::string server_state = info[jss::server_state].asString();
    auto load_factor = info[jss::load_factor_server].asDouble() /
        info[jss::load_base].asDouble();

    enum { healthy, warning, critical };
    int health = healthy;
    auto set_health = [&health](int state) {
        if (health < state)
            health = state;
    };

    msg.body()[jss::info] = Json::objectValue;
    if (last_validated_ledger_age >= 7 || last_validated_ledger_age < 0)
    {
        msg.body()[jss::info][jss::validated_ledger] =
            last_validated_ledger_age;
        if (last_validated_ledger_age < 20)
            set_health(warning);
        else
            set_health(critical);
    }

    if (amendment_blocked)
    {
        msg.body()[jss::info][jss::amendment_blocked] = true;
        set_health(critical);
    }

    if (number_peers <= 7)
    {
        msg.body()[jss::info][jss::peers] = number_peers;
        if (number_peers != 0)
            set_health(warning);
        else
            set_health(critical);
    }

    if (!(server_state == "full" || server_state == "validating" ||
          server_state == "proposing"))
    {
        msg.body()[jss::info][jss::server_state] = server_state;
        if (server_state == "syncing" || server_state == "tracking" ||
            server_state == "connected")
        {
            set_health(warning);
        }
        else
            set_health(critical);
    }

    if (load_factor > 100)
    {
        msg.body()[jss::info][jss::load_factor] = load_factor;
        if (load_factor < 1000)
            set_health(warning);
        else
            set_health(critical);
    }

    switch (health)
    {
        case healthy:
            msg.result(boost::beast::http::status::ok);
            break;
        case warning:
            msg.result(boost::beast::http::status::service_unavailable);
            break;
        case critical:
            msg.result(boost::beast::http::status::internal_server_error);
            break;
    }

    msg.prepare_payload();
    handoff.response = std::make_shared<SimpleWriter>(msg);
    return true;
}

bool
OverlayImpl::processRequest(http_request_type const& req, Handoff& handoff)
{
    // Take advantage of || short-circuiting
    return processCrawl(req, handoff) || processValidatorList(req, handoff) ||
        processHealth(req, handoff);
}

Overlay::PeerSequence
OverlayImpl::getActivePeers() const
{
    Overlay::PeerSequence ret;
    ret.reserve(size());

    for_each([&ret](std::shared_ptr<PeerImp>&& sp) {
        ret.emplace_back(std::move(sp));
    });

    return ret;
}

Overlay::PeerSequence
OverlayImpl::getActivePeers(
    std::set<Peer::id_t> const& toSkip,
    std::size_t& active,
    std::size_t& disabled,
    std::size_t& enabledInSkip) const
{
    Overlay::PeerSequence ret;
    std::lock_guard lock(mutex_);

    active = ids_.size();
    disabled = enabledInSkip = 0;
    ret.reserve(ids_.size());

    // NOTE The purpose of p is to delay the destruction of PeerImp
    std::shared_ptr<PeerImp> p;
    for (auto& [id, w] : ids_)
    {
        if (p = w.lock(); p != nullptr)
        {
            bool const reduceRelayEnabled = p->txReduceRelayEnabled();
            // tx reduced relay feature disabled
            if (!reduceRelayEnabled)
                ++disabled;

            if (toSkip.count(id) == 0)
                ret.emplace_back(std::move(p));
            else if (reduceRelayEnabled)
                ++enabledInSkip;
        }
    }

    return ret;
}

void
OverlayImpl::checkTracking(std::uint32_t index)
{
    for_each(
        [index](std::shared_ptr<PeerImp>&& sp) { sp->checkTracking(index); });
}

std::shared_ptr<Peer>
OverlayImpl::findPeerByShortID(Peer::id_t const& id) const
{
    std::lock_guard lock(mutex_);
    auto const iter = ids_.find(id);
    if (iter != ids_.end())
        return iter->second.lock();
    return {};
}

// A public key hash map was not used due to the peer connect/disconnect
// update overhead outweighing the performance of a small set linear search.
std::shared_ptr<Peer>
OverlayImpl::findPeerByPublicKey(PublicKey const& pubKey)
{
    std::lock_guard lock(mutex_);
    // NOTE The purpose of peer is to delay the destruction of PeerImp
    std::shared_ptr<PeerImp> peer;
    for (auto const& e : ids_)
    {
        if (peer = e.second.lock(); peer != nullptr)
        {
            if (peer->getNodePublic() == pubKey)
                return peer;
        }
    }
    return {};
}

void
OverlayImpl::broadcast(protocol::TMProposeSet& m)
{
    auto const sm = std::make_shared<Message>(m, protocol::mtPROPOSE_LEDGER);
    for_each([&](std::shared_ptr<PeerImp>&& p) { p->send(sm); });
}

std::set<Peer::id_t>
OverlayImpl::relay(
    protocol::TMProposeSet& m,
    uint256 const& uid,
    PublicKey const& validator)
{
    if (auto const toSkip = app_.getHashRouter().shouldRelay(uid))
    {
        auto const sm =
            std::make_shared<Message>(m, protocol::mtPROPOSE_LEDGER, validator);
        for_each([&](std::shared_ptr<PeerImp>&& p) {
            if (toSkip->find(p->id()) == toSkip->end())
                p->send(sm);
        });
        return *toSkip;
    }
    return {};
}

void
OverlayImpl::broadcast(protocol::TMValidation& m)
{
    auto const sm = std::make_shared<Message>(m, protocol::mtVALIDATION);
    for_each([sm](std::shared_ptr<PeerImp>&& p) { p->send(sm); });
}

std::set<Peer::id_t>
OverlayImpl::relay(
    protocol::TMValidation& m,
    uint256 const& uid,
    PublicKey const& validator)
{
    if (auto const toSkip = app_.getHashRouter().shouldRelay(uid))
    {
        auto const sm =
            std::make_shared<Message>(m, protocol::mtVALIDATION, validator);
        for_each([&](std::shared_ptr<PeerImp>&& p) {
            if (toSkip->find(p->id()) == toSkip->end())
                p->send(sm);
        });
        return *toSkip;
    }
    return {};
}

std::shared_ptr<Message>
OverlayImpl::getManifestsMessage()
{
    std::lock_guard g(manifestLock_);

    if (auto seq = app_.validatorManifests().sequence();
        seq != manifestListSeq_)
    {
        protocol::TMManifests tm;

        app_.validatorManifests().for_each_manifest(
            [&tm](std::size_t s) { tm.mutable_list()->Reserve(s); },
            [&tm, &hr = app_.getHashRouter()](Manifest const& manifest) {
                tm.add_list()->set_stobject(
                    manifest.serialized.data(), manifest.serialized.size());
                hr.addSuppression(manifest.hash());
            });

        manifestMessage_.reset();

        if (tm.list_size() != 0)
            manifestMessage_ =
                std::make_shared<Message>(tm, protocol::mtMANIFESTS);

        manifestListSeq_ = seq;
    }

    return manifestMessage_;
}

void
OverlayImpl::relay(
    uint256 const& hash,
    std::optional<std::reference_wrapper<protocol::TMTransaction>> tx,
    std::set<Peer::id_t> const& toSkip)
{
    bool relay = tx.has_value();
    if (relay)
    {
        auto& txn = tx->get();
        SerialIter sit(makeSlice(txn.rawtransaction()));
        try
        {
            relay = !isPseudoTx(STTx{sit});
        }
        catch (std::exception const&)
        {
            // Could not construct STTx, not relaying
            JLOG(journal_.debug()) << "Could not construct STTx: " << hash;
            return;
        }
    }

    Overlay::PeerSequence peers = {};
    std::size_t total = 0;
    std::size_t disabled = 0;
    std::size_t enabledInSkip = 0;

    if (!relay)
    {
        if (!app_.config().TX_REDUCE_RELAY_ENABLE)
            return;

        peers = getActivePeers(toSkip, total, disabled, enabledInSkip);
        JLOG(journal_.trace())
            << "not relaying tx, total peers " << peers.size();
        for (auto const& p : peers)
            p->addTxQueue(hash);
        return;
    }

    auto& txn = tx->get();
    auto const sm = std::make_shared<Message>(txn, protocol::mtTRANSACTION);
    peers = getActivePeers(toSkip, total, disabled, enabledInSkip);
    auto const minRelay = app_.config().TX_REDUCE_RELAY_MIN_PEERS + disabled;

    if (!app_.config().TX_REDUCE_RELAY_ENABLE || total <= minRelay)
    {
        for (auto const& p : peers)
            p->send(sm);
        if (app_.config().TX_REDUCE_RELAY_ENABLE ||
            app_.config().TX_REDUCE_RELAY_METRICS)
            txMetrics_.addMetrics(total, toSkip.size(), 0);
        return;
    }

    // We have more peers than the minimum (disabled + minimum enabled),
    // relay to all disabled and some randomly selected enabled that
    // do not have the transaction.
    auto const enabledTarget = app_.config().TX_REDUCE_RELAY_MIN_PEERS +
        (total - minRelay) * app_.config().TX_RELAY_PERCENTAGE / 100;

    txMetrics_.addMetrics(enabledTarget, toSkip.size(), disabled);

    if (enabledTarget > enabledInSkip)
        std::shuffle(peers.begin(), peers.end(), default_prng());

    JLOG(journal_.trace()) << "relaying tx, total peers " << peers.size()
                           << " selected " << enabledTarget << " skip "
                           << toSkip.size() << " disabled " << disabled;

    // count skipped peers with the enabled feature towards the quota
    std::uint16_t enabledAndRelayed = enabledInSkip;
    for (auto const& p : peers)
    {
        // always relay to a peer with the disabled feature
        if (!p->txReduceRelayEnabled())
        {
            p->send(sm);
        }
        else if (enabledAndRelayed < enabledTarget)
        {
            enabledAndRelayed++;
            p->send(sm);
        }
        else
        {
            p->addTxQueue(hash);
        }
    }
}

//------------------------------------------------------------------------------

void
OverlayImpl::remove(Child& child)
{
    std::lock_guard lock(mutex_);
    list_.erase(&child);
    if (list_.empty())
        cond_.notify_all();
}

void
OverlayImpl::stopChildren()
{
    // Calling list_[].second->stop() may cause list_ to be modified
    // (OverlayImpl::remove() may be called on this same thread).  So
    // iterating directly over list_ to call child->stop() could lead to
    // undefined behavior.
    //
    // Therefore we copy all of the weak/shared ptrs out of list_ before we
    // start calling stop() on them.  That guarantees OverlayImpl::remove()
    // won't be called until vector<> children leaves scope.
    std::vector<std::shared_ptr<Child>> children;
    {
        std::lock_guard lock(mutex_);
        if (!work_)
            return;
        work_ = std::nullopt;

        children.reserve(list_.size());
        for (auto const& element : list_)
        {
            children.emplace_back(element.second.lock());
        }
    }  // lock released

    for (auto const& child : children)
    {
        if (child != nullptr)
            child->stop();
    }
}

void
OverlayImpl::autoConnect()
{
    auto const result = m_peerFinder->autoconnect();
    for (auto addr : result)
        connect(addr);
}

void
OverlayImpl::sendEndpoints()
{
    auto const result = m_peerFinder->buildEndpointsForPeers();
    for (auto const& e : result)
    {
        std::shared_ptr<PeerImp> peer;
        {
            std::lock_guard lock(mutex_);
            auto const iter = m_peers.find(e.first);
            if (iter != m_peers.end())
                peer = iter->second.lock();
        }
        if (peer)
            peer->sendEndpoints(e.second.begin(), e.second.end());
    }
}

void
OverlayImpl::sendTxQueue()
{
    for_each([](auto const& p) {
        if (p->txReduceRelayEnabled())
            p->sendTxQueue();
    });
}

std::shared_ptr<Message>
makeSquelchMessage(
    PublicKey const& validator,
    bool squelch,
    uint32_t squelchDuration)
{
    protocol::TMSquelch m;
    m.set_squelch(squelch);
    m.set_validatorpubkey(validator.data(), validator.size());
    if (squelch)
        m.set_squelchduration(squelchDuration);
    return std::make_shared<Message>(m, protocol::mtSQUELCH);
}

void
OverlayImpl::unsquelch(PublicKey const& validator, Peer::id_t id) const
{
    if (auto peer = findPeerByShortID(id); peer)
    {
        // optimize - multiple message with different
        // validator might be sent to the same peer
        peer->send(makeSquelchMessage(validator, false, 0));
    }
}

void
OverlayImpl::squelch(
    PublicKey const& validator,
    Peer::id_t id,
    uint32_t squelchDuration) const
{
    if (auto peer = findPeerByShortID(id); peer)
    {
        peer->send(makeSquelchMessage(validator, true, squelchDuration));
    }
}

void
OverlayImpl::updateSlotAndSquelch(
    uint256 const& key,
    PublicKey const& validator,
    std::set<Peer::id_t>&& peers,
    protocol::MessageType type)
{
    if (!slots_.baseSquelchReady())
        return;

    if (!strand_.running_in_this_thread())
        return post(
            strand_,
            // Must capture copies of reference parameters (i.e. key, validator)
            [this,
             key = key,
             validator = validator,
             peers = std::move(peers),
             type]() mutable {
                updateSlotAndSquelch(key, validator, std::move(peers), type);
            });

    for (auto id : peers)
        slots_.updateSlotAndSquelch(key, validator, id, type, [&]() {
            reportInboundTraffic(TrafficCount::squelch_ignored, 0);
        });
}

void
OverlayImpl::updateSlotAndSquelch(
    uint256 const& key,
    PublicKey const& validator,
    Peer::id_t peer,
    protocol::MessageType type)
{
    if (!slots_.baseSquelchReady())
        return;

    if (!strand_.running_in_this_thread())
        return post(
            strand_,
            // Must capture copies of reference parameters (i.e. key, validator)
            [this, key = key, validator = validator, peer, type]() {
                updateSlotAndSquelch(key, validator, peer, type);
            });

    slots_.updateSlotAndSquelch(key, validator, peer, type, [&]() {
        reportInboundTraffic(TrafficCount::squelch_ignored, 0);
    });
}

void
OverlayImpl::deletePeer(Peer::id_t id)
{
    if (!strand_.running_in_this_thread())
        return post(strand_, std::bind(&OverlayImpl::deletePeer, this, id));

    slots_.deletePeer(id, true);
}

void
OverlayImpl::deleteIdlePeers()
{
    if (!strand_.running_in_this_thread())
        return post(strand_, std::bind(&OverlayImpl::deleteIdlePeers, this));

    slots_.deleteIdlePeers();
}

//------------------------------------------------------------------------------

Overlay::Setup
setup_Overlay(BasicConfig const& config)
{
    Overlay::Setup setup;

    {
        auto const& section = config.section("overlay");
        setup.context = make_SSLContext("");

        set(setup.ipLimit, "ip_limit", section);
        if (setup.ipLimit < 0)
            Throw<std::runtime_error>("Configured IP limit is invalid");

        std::string ip;
        set(ip, "public_ip", section);
        if (!ip.empty())
        {
            boost::system::error_code ec;
            setup.public_ip = boost::asio::ip::make_address(ip, ec);
            if (ec || beast::IP::is_private(setup.public_ip))
                Throw<std::runtime_error>("Configured public IP is invalid");
        }
    }

    {
        auto const& section = config.section("crawl");
        auto const& values = section.values();

        if (values.size() > 1)
        {
            Throw<std::runtime_error>(
                "Configured [crawl] section is invalid, too many values");
        }

        bool crawlEnabled = true;

        // Only allow "0|1" as a value
        if (values.size() == 1)
        {
            try
            {
                crawlEnabled = boost::lexical_cast<bool>(values.front());
            }
            catch (boost::bad_lexical_cast const&)
            {
                Throw<std::runtime_error>(
                    "Configured [crawl] section has invalid value: " +
                    values.front());
            }
        }

        if (crawlEnabled)
        {
            if (get<bool>(section, "overlay", true))
            {
                setup.crawlOptions |= CrawlOptions::Overlay;
            }
            if (get<bool>(section, "server", true))
            {
                setup.crawlOptions |= CrawlOptions::ServerInfo;
            }
            if (get<bool>(section, "counts", false))
            {
                setup.crawlOptions |= CrawlOptions::ServerCounts;
            }
            if (get<bool>(section, "unl", true))
            {
                setup.crawlOptions |= CrawlOptions::Unl;
            }
        }
    }
    {
        auto const& section = config.section("vl");

        set(setup.vlEnabled, "enabled", section);
    }

    try
    {
        auto id = config.legacy("network_id");

        if (!id.empty())
        {
            if (id == "main")
                id = "0";

            if (id == "testnet")
                id = "1";

            if (id == "devnet")
                id = "2";

            setup.networkID = beast::lexicalCastThrow<std::uint32_t>(id);
        }
    }
    catch (...)
    {
        Throw<std::runtime_error>(
            "Configured [network_id] section is invalid: must be a number "
            "or one of the strings 'main', 'testnet' or 'devnet'.");
    }

    return setup;
}

std::unique_ptr<Overlay>
make_Overlay(
    Application& app,
    Overlay::Setup const& setup,
    ServerHandler& serverHandler,
    Resource::Manager& resourceManager,
    Resolver& resolver,
    boost::asio::io_context& io_context,
    BasicConfig const& config,
    beast::insight::Collector::ptr const& collector)
{
    return std::make_unique<OverlayImpl>(
        app,
        setup,
        serverHandler,
        resourceManager,
        resolver,
        io_context,
        config,
        collector);
}

}  // namespace ripple
